查看原文
其他

代达罗斯之殇-大数据领域小文件问题解决攻略

大数据真好玩 大数据真好玩 2022-09-10

点击上方蓝色字体,选择“设为星标”

回复”资源“获取更多惊喜

大数据技术与架构点击右侧关注,大数据开发领域最强公众号!

大数据真好玩点击右侧关注,大数据真好玩!


海量小文件问题是工业界和学术界公认的难题,大数据领域中的小文件问题,也是一个非常棘手的问题,仅次于数据倾斜问题,对于时间和性能能都是毁灭性打击。本文参考网上对于小文件问题的定义和常见系统的解决方案,给大家还原一个大数据系统中小文件问题的系统性解决方案。
本文针对目前大数据领域主要的主要框架,讲解了小文件产生的原因和一些解决办法,本文参考了众多其他作者的文章和博客,文末给出了连接,感谢各位前辈的付出。

小文件问题概述

衡量存储系统性能主要有两个关键指标,即IOPS和数据吞吐量。IOPS (Input/Output Per Second) 即每秒的输入输出量 ( 或读写次数 ) ,是衡量存储系统性能的主要指标之一。IOPS 是指单位时间内系统能处理的 I/O 请求数量,一般以每秒处理的 I/O 请求数量为单位, I/O 请求通常为读或写数据操作请求。随机读写频繁的应用,如 OLTP(OnlineTransaction Processing) ,IOPS 是关键衡量指标。另一个重要指标是数据吞吐量(Throughput),指单位时间内可以成功传输的数据数量。对于大量顺序读写的应用,如VOD(VideoOn Demand),则更关注吞吐量指标。
我们的存储磁盘最适合顺序的大文件I/O读写模式,非常不适合随机的小文件I/O读写模式,这是磁盘文件系统在海量小文件应用下性能表现不佳的根本原因。磁盘文件系统的设计大多都侧重于大文件,包括元数据管理、数据布局和I/O访问流程,另外VFS系统调用机制也非常不利于海量小文件,这些软件层面的机制和实现加剧了小文件读写的性能问题。
小文件过多产生的主要问题包括:
(1) 元数据管理低效
由于小文件数据内容较少,因此元数据的访问性能对小文件访问性能影响巨大。当前主流的磁盘文件系统基本都是面向大文件高聚合带宽设计的,而不是小文件的低延迟访问。磁盘文件系统中,目录项(dentry)、索引节点(inode)和数据(data)保存在存储介质的不同位置上。因此,访问一个文件需要经历至少3次独立的访问。这样,并发的小文件访问就转变成了大量的随机访问,而这种访问对于广泛使用的磁盘来说是非常低效的。同时,文件系统通常采用Hash树、 B+ 树或 B * 树来组织和索引目录,这种方法不能在数以亿计的大目录中很好的扩展,海量目录下检索效率会明显下降。正是由于单个目录元数据组织能力的低效,文件系统使用者通常被鼓励把文件分散在多层次的目录中以提高性能。然而,这种方法会进一步加大路径查询的开销。
(2) 数据布局低效
磁盘文件系统使用块来组织磁盘数据,并在inode中使用多级指针或hash树来索引文件数据块。数据块通常比较小,一般为1KB、2KB或4KB。当文件需要存储数据时,文件系统根据预定的策略分配数据块,分配策略会综合考虑数据局部性、存储空间利用效率等因素,通常会优先考虑大文件I/O带宽。对于大文件,数据块会尽量进行连续分配,具有比较好的空间局部性。对于小文件,尤其是大文件和小文件混合存储或者经过大量删除和修改后,数据块分配的随机性会进一步加剧,数据块可能零散分布在磁盘上的不同位置,并且会造成大量的磁盘碎片(包括内部碎片和外部碎片),不仅造成访问性能下降,还导致大量磁盘空间浪费。对于特别小的小文件,比如小于4KB,inode与数据分开存储,这种数据布局也没有充分利用空间局部性,导致随机I/O访问,目前已经有文件系统实现了data in inode。
(3) I/O访问流程复杂
Linux等操作系统采用VFS或类似机制来抽象文件系统的实现,提供标准统一访问接口和流程,它提供通用的Cache机制,处理文件系统相关的所有系统调用,与具体文件系统和其他内核组件(如内存管理)交互。VFS可以屏蔽底层文件系统实现细节,简化文件系统设计,实现对不同文件系统支持的扩展。VFS通用模型中有涉及四种数据类型:超级块对象(superblock object)、索引结点对象(inode object)、文件对象(file object)和目录项对象(dentry object),进程在进行I/O访问过程中需要频繁与它们交互(如下图所示)。

对于大多数分布式文件系统而言,通常将元数据与数据两者独立开来,即控制流与数据流进行分离,从而获得更高的系统扩展性和I/O并发性。数据和I/O访问负载被分散到多个物理独立的存储节点,从而实现系统的高扩展性和高性能,每个节点使用磁盘文件系统管理数据,比如XFS、EXT4、XFS等。因此,相对于磁盘文件系统而言,每个节点的小文件问题是相同的。由于分布式的架构,分布式文件系统中的网络通信、元数据服务MDC、Cache管理、数据布局和I/O访问模式等都会对IOPS/OPS性能产生影响,进一步加剧小文件问题。

小文件合并

小文件合并存储是目前优化LOSF问题最为成功的策略,已经被包括Facebook Haystack和淘宝TFS在内多个分布式存储系统采用。它通过多个逻辑文件共享同一个物理文件,将多个小文件合并存储到一个大文件中,实现高效的小文件存储。为什么这种策略对LOSF效果显著呢?
首先,减少了大量元数据。通过将大量的小文件存储到一个大文件中,从而把大量的小文件数据变成大文件数据,减少了文件数量,从而减少了元数据服务中的元数据数量,提高了元数据的检索和查询效率,降低了文件读写的I /O操作延时,节省了大量的数据传输时间。LOSF元数据开销所占比重大,大幅减少元数据,将直接导致性能的显著提升。合并后的大文件存储在磁盘文件系统之上,同时也大大降低了磁盘文件系统在元数据和I/O方面的压力,这点可以改善每个节点的存储性能。小文件的元数据和数据会一并存储在大文件中,并形成索引文件,访问时通过索引进行定位。索引文件采用预加载到Cache的策略,可以实现随机读写小文件只需要一次I/O。
其次,增加了数据局部性,提高了存储效率。磁盘文件系统或者分布式文件系统中,文件的元数据和数据存储在不同位置。采用合并存储机制后,小文件的元数据和数据可以一并连续存储大文件中,这大大增强了单个小文件内部的数据局部性。小文件合并过程中,可以利用文件之间的空间局部性、时间局部性以及关联,尽量将可能连续访问的小文件在大文件中进行连续存储,增强了小文件之间的数据局部性。这直接降低了磁盘上随机I/O比率,转换成了顺序I/O,能够有效提高I/O读写性能。另外,小文件单独存储会形成外部和内部碎片,而合并存储后存储碎片将大大降低,这极大提高了LOSF存储效率。
再次,简化了I/O访问流程。采用小文件合并存储后,I/O访问流程发生了极大变化,主要体现在存储节点磁盘文件系统上。根据之前的阐述,磁盘文件系统读写一个小文件,最大的系统消耗在open系统调用,需要进行路径查找do_path_lookup,将路径名进行分量解析,转换成对应文件在内核中内部表示。这个过程非常占用系统开销,尤其是深目录下的文件。而经过合并,很多小文件共享一个大文件,open操作转换成了开销小很多的seek操作,根据索引定位到大文件内部相应位置即可,也不需要在内核中创建相关VFS数据对象,这节省了原先绝大部分的系统开销。
大文件加上索引文件,小文件合并存储实际上相当于一个微型文件系统。这种机制对于WORM(Write Once Read Many)模式的分布式存储系统非常适合,而不适合允许改写和删除的存储系统。因为文件改写和删除操作,会造成大文件内部的碎片空洞,如果进行空间管理并在合适时候执行碎片整理,实现比较复杂而且产生额外开销。如果不对碎片进行处理,采用追加写的方式,一方面会浪费存储容量,另一方面又会破坏数据局部性,增加数据分布的随机性,导致读性能下降。此外,如果支持随机读写,大小文件如何统一处理,小文件增长成大文件,大文件退化为小文件,这些问题都是在实际处理时面临的挑战。

Hadoop小文件合并策略和方式

Hadoop中的小文件一般是指明显小于Hadoop的block size的文件。Hadoop的block size一般是64MB,128MB或者256MB,现在一般趋向于设置的越来越大。后文要讨论的内容会基于128MB,这也是CDH中的默认值。为了方便后面的讨论,Fayson这里假定如果文件大小小于block size的75%,则定义为小文件。但小文件不仅是指文件比较小,如果Hadoop集群中的大量文件略大于block size,同样也会存在小文件问题。
比如,假设block size是128MB,但加载到Hadoop的所有文件都是136MB,就会存在大量8MB的block。处理这种“小块”问题你可以调大block size来解决,但解决小文件问题却要复杂的多。
Hadoop 小文件是怎么来的
一个Hadoop集群中存在小文件问题是很正常的,可能的原因如下:
  1. 现在我们越来越多的将Hadoop用于(准)实时计算,在做数据抽取时处理的频率可能是每小时,每天,每周等,每次可能就只生成一个不到10MB的文件。

  2. 数据源有大量小文件,未做处理直接拷贝到Hadoop集群。

  3. MapReduce作业的配置未设置合理的reducer或者未做限制,每个reduce都会生成一个独立的文件。另外如果数据倾斜,导致大量的数据都shuffle到一个reduce,然后其他的reduce都会处理较小的数据量并输出小文件。

Hadoop的小文件问题主要是会对NameNode内存管理和MapReduce性能造成影响。Hadoop中的每个目录、文件和block都会以对象的形式保存在NameNode的内存中。根据经验每个对象在内存中大概占用150个字节。如果HDFS中保存2000万个文件,每个文件都在同一个文件夹中,而且每个文件都只有一个block,则NameNode需要6GB内存。如果只有这么点文件,这当然也没有什么问题,但是随着数据量的增长集群的扩容,最终会达到单台NameNode可以处理的文件(block)数量的上限。基于同样的假设,如果HDFS中保存的数据文件增长到10亿个,则NameNode需要300GB内存。以下Fayson带大家看看300GB内存的NameNode会有什么影响:
1.当NameNode重启时,它都需要从本地磁盘读取每个文件的元数据,意味着你要读取300GB数据到内存中,不可避免导致NameNode启动时间较长。
2.一般来说,NameNode会不断跟踪并检查每个数据块的存储位置。这是通过DataNode的定时心跳上报其数据块来实现的。数据节点需要上报的block越多,则也会消耗越多的网络带宽/时延。即使节点之间是高速网络(万兆/光纤),但不可避免的会带来一些不好的影响。
3.NameNode本身使用300G内存,相当于JVM你需要配置300GB的heap,对于JVM来说本来就存在稳定性的风险,比如GC时间较长。
所以优化和解决小文件问题基本是必须的。如果可以减少集群上的小文件数,则可以减少NameNode的内存占用,启动时间以及网络影响。

如果集群中有大量小文件,会降低MapReduce的处理性能,无论是Hive,Pig还是Java MapReduce,当然其实其他计算引擎比如Spark,Impala也会受到影响。第一个原因是大量小文件意味着大量的随机磁盘IO。磁盘IO通常是MapReduce性能的最大瓶颈之一,在HDFS中对于相同数量的数据,一次大的顺序读取往往优于几次随机读取的性能。如果可以将数据存储在较少,而更大的一些block中,可以降低磁盘IO的性能影响。
性能下降的第二个原因有点复杂,需要了解MapReduce如何处理文件和资源调度。当MapReduce任务启动时,每个数据block会被分配为一个map任务。HDFS中的每个文件至少是一个block。如果你有10000个文件,而且每个文件10MB,那么这个MapReduce作业会被分配为10000个map任务。一般来说每个Hadoop的map任务会运行在自己的JVM中,所以会带来10000个JVM的启动和关闭的资源开销。
集群的资源是有限的,为了方便计算,假设我们在YARN的配置中为每个NodeManager配置20个vcore,那么为了同时运行10000个mapper,你需要500台节点。大多数Hadoop集群都小于这个规模,所以一般情况下大量map任务可能只能排队等待ResourceManager来分配资源。如果你只有10台机器,那么总共只有200个vcore,这个排队的队列会较大,相应的处理时间也会变的较长。另外,往往你的集群中可能不止这一个作业。
如果10000个10MB文件换成800个128MB的文件,那么你就只需要800个map任务。相当于减少了一个数量级的JVM维护时间,同时也优化了磁盘IO。尽管一个单独的map任务处理一个128MB的文件比一个10MB的文件时间要慢,但是整个作业的总运行时间肯定可以降低一个数量级。
解决NameNode的内存问题
上面的内容提到过每个block的元数据都需要加载到NameNode的内存中,这导致一个Hadoop集群在NameNode中存储的对象是有上限的,并且对象太多会带来启动时间较长以及网络延迟的问题。常见的有两种解决方案,减少集群的NameNode中的对象数量,或者以某种方式让NameNode使用更多的“内存”但不会导致较长的启动时间,这就是Hadoop Archive(HAR)文件和NameNode联邦。
Hadoop Archive Files

Hadoop archive files通过将许多小文件打包到更大的HAR文件中来缓解NameNode内存问题,类似于Linux上的TAR文件。这样可以让NameNode只处理单个HAR文件,而不是数十个或数百个小文件。可以使用har://前缀而不是hdfs://来访问HAR文件中的文件。HAR文件是基于HDFS中已有的文件创建的。因此,HAR文件不仅可以合并从数据源抽取到HDFS中的数据,也可以合并通过正常的MapReduce处理创建的数据。HAR文件可以独立的用于解决小文件问题,除了HDFS,没有其他的依赖。
虽然HAR文件减少了NameNode中小文件对内存的占用,但访问HAR文件内容性能可能会更低。HAR文件仍然随机存储在磁盘上,并且读取HAR内的文件需要访问两个索引 - 一个用于NameNode找到HAR文件本身,一个用于在HAR文件内找到小文件的位置。在HAR中读取文件实际上可能比读取存储在HDFS上的相同文件慢。MapReduce作业的性能同样会受到影响,因为它仍旧会为每个HAR文件中的每个文件启动一个map任务。
所以这里我们需要有一个权衡(trade-off),HAR文件可以解决NameNode内存问题,但同时会降低读取性能。如果你的小文件主要用于存档,并且不经常访问,那么HAR文件是一个很好的解决方案。如果小文件经常要被读取或者处理,那么可能需要重新考虑解决方案。
NameNode联邦

NameNode联邦允许你在一个集群中拥有多个NameNode,每个NameNode都存储元数据对象的子集。这样可以让所有的元数据对象都不止存储在单个机器上,也消除了单个节点的内存限制,因为你可以扩容。这听上去是一个很美丽的方案,但其实它也有局限性。
NameNode联邦隔离了元数据对象 - 仅仅只有某一个NameNode知道某一个特定的元数据对象在哪里,意思就是说如果你想找到某个文件,你必须知道它是保存在哪个NameNode上的。如果你的集群中有多个租户和/或隔离的应用程序,那使用NameNode联邦是挺不错的,你可以通过租户或者应用程序来隔离元数据对象。但是,如果要在所有的应用程序之间共享数据,则该方法其实也并不是完美的。
由于NameNode联邦并不会改变集群中对象或者块的数量,所以它并没有解决MapReduce的性能问题。相反,联邦会增加Hadoop集群安装和维护的复杂度。所以我们说联邦可以解决小文件问题,倒不如说它提供了一种办法让你“隐藏”小文件。
解决MapReduce性能问题
根据之前讨论的内容,MapReduce性能问题主要是由随机磁盘IO和启动/管理太多的map任务组合引起的。解决方案似乎很明显 - 合并小文件,然而这个事往往说起来容易做起来难。以下讨论一下几种解决方案:
注:虽然本章名为解决MapReduce的性能问题,但其实同样也是为了解决NameNode的压力,以及解决其他计算引擎比如Impala/Spark的性能问题。
1.修改数据抽取方法/间隔
2.批量文件合并
3.Sequence文件
4.HBase
5.S3DistCp (如果使用Amazon EMR)
6.使用CombineFileInputFormat
7.通过Hive合并小文件
8.使用Hadoop的追加特性
修改数据抽取方法/间隔
解决小文件问题的最简单方法就是在生成阶段就进行杜绝。如果是由数据源产生大量小文件并直接拷贝到Hadoop,可以调研了解数据源是否能生成一些大文件,或者从数据源到HDFS的数据抽取过程中进行数据处理合并小文件。如果每小时只抽取10MB的数据,考虑是否改为每天一次,这样创建1个240MB的文件而不是24个10MB的文件。但是,你可能无法控制数据源的改动配合或业务对数据抽取间隔的需求,这样小文件问题无法避免,这时可能需要考虑其他的解决方案。
批量文件合并
当产生小文件是不可避免时,文件合并是常见的解决方案。使用这种方法,你可以定期运行一个MapReduce任务,读取某一个文件夹中的所有小文件,并将它们重写为较少数量的大文件。比如一个文件夹中有1000个文件,你可以在一个MapReduce任务中指定reduce的数量为5,这样1000个输入文件会被合并为5个文件。随后进行一些简单的HDFS文件/文件夹操作(将新文件覆盖回原目录),则可以将NameNode的内存使用减少到200分之1,并且可以提高以后MapReduce或其他计算引擎对同一数据处理的性能。
举例如果使用Pig,只需要2行包括load和store语句即可以实现。比如合并文本文件:

在Hive或Java MapReduce中实现同样比较容易。这些MapReduce作业运行同样需要集群资源,所以建议调度在生产系统非繁忙时间段执行。但是,应该定期执行这种合并的MapReduce作业,因为小文件随时或者几乎每天都可能产生。但这个合并程序需要有额外的逻辑来判断存在大量小文件的目录,或者你自己是知道哪些目录是存在大量小文件的。因为假如某个目录只有3个文件,运行合并作业远不如合并一个500个文件的文件夹的性能优势提升明显。
检查所有文件夹并确认哪些文件夹中的小文件需要合并,目前主要是通过自定义的脚本或程序,当然一些商业工具也能做,比如Pentaho可以迭代HDFS中的一组文件夹,找到最小合并要求的文件夹。这里还推荐另外一个开源工具File Crush
https://github.com/edwardcapriolo/filecrush/
File Crush没有专业支持,所以无法保证它可以与Hadoop的后续版本一直保持兼容。
批量合并文件的方法无法保留原始文件名,如果原始文件名对于你了解数据来源非常重要,则批量合并文件的方法也不适用。但一般来说,我们一般只会设计HDFS的各级目录的文件名,而不会细化到每个文件的名字,所以理论来说这种方法问题也不大。
Sequence文件

当需要维护原始文件名时,常见的方法是使用Sequence文件。在此解决方案中,文件名作为key保存在sequence文件中,然后文件内容会作为value保存。下图给出将一些小文件存储为sequence文件的示例:

如果一个sequence文件包含10000个小文件,则同时会包含10000个key在一个文件中。sequence文件支持块压缩,并且是可被拆分的。这样MapReduce作业在处理这个sequence文件时,只需要为每个128MB的block启动一个map任务,而不是每个小文件启动一个map任务。当你在同时抽取数百个或者数千个小文件,并且需要保留原始文件名时,这是非常不错的方案。
但是,如果你一次仅抽取少量的小文件到HDFS,则sequence文件的方法也不太可行,因为sequence文件是不可变的,无法追加。比如3个10MB文件将产生1个30MB的sequence文件,根据本文前面的定义,这仍然是一个小文件。另外一个问题是如果需要检索sequence文件中的文件名列表则需要遍历整个文件。
另外一个问题是Hive并不能较好的处理由该方法合并出来的sequence文件。Hive将value中的所有数据视为单行。这样会导致Hive查看这些数据不方便,因为以前小文件中的一行的所有数据也是Hive中的单行,即相当于只有一个字段。同时,Hive没办法访问这种sequence的key,即文件名。当然你可以自定义Hive serde来实现,不过这个超过了本文需要讨论的范围。
HBase

解决小文件问题,除了HDFS存储外,当然还可以考虑HBase列式存储。使用HBase可以将数据抽取过程从生成大量小HDFS文件更改为以逐条记录写入到HBase表。如果你对数据访问的需求主要是随机查找或者叫点查,则HBase是最好的选择。HBase在架构上就是为快速插入,存储大量数据,单个记录的快速查找以及流式数据处理而设计的。但如果你对数据访问的需求主要是全表扫描,则HBase不是最适合的。
可以基于HBase的表的数据创建Hive表,但是查询这种Hive表对于不同的查询类型性能会不一样。当查询单行或者范围查找时,Hive on HBase会表现不错,但是如果是全表扫描则效率比较低下,大多数分析查询比如带group by的语句都是全表扫描。
使用HBase,可以较好的应对实时数据写入以及实时查询的场景。但是如何分配和平衡HBase与集群上其他的组件的资源使用,以及HBase本身运维都会带来额外的运维管理成本。另外,HBase的性能主要取决于你的数据访问方式,所以在选择HBase解决小文件问题之前,应该进行仔细调研和设计。
S3DistCp (如果使用Amazon EMR)

此解决方案仅适用于Amazon EMR的用户,当然你在AWS中使用CDH也一样。Amazon EMR集群一般设计为短期存储,而在S3中持久化保存数据。即使使用S3,依旧存在小文件问题,所以这时需要选择S3DistCp。
S3DistCp是由Amazon提供的一个工具,用于分布式将S3中的数据拷贝到临时的HDFS或其他S3 bucket。这个工具可以通过配置groupBy和targetSize参数来将文件合并到一起。如果S3中存储了数千个EMR需要处理的小文件时,这个工具是一个不错的选择。S3DistCp通过连接许多小文件并导入到HDFS中,据报道,该方式的性能也非常优秀。
S3DistCp这个工具跟之前文章提到的批量合并文件的方法其实是类似的,只是说Amazon给你提供了一个现成的工具。
使用CombineFileInputFormat
CombineFileInputFormat是Hadoop提供的抽象类,它在MapReduce读取时合并小文件。合并的文件不会持久化到磁盘,它是在一个map任务中合并读取到的这些小文件。好处是MapReduce可以不用为每个小文件启动一个map任务,而且因为是自带的实现类,你不用额外将小文件先提前合并。这解决了MapReduce作业启动太多map任务的问题,但是因为作业仍然在读取多个小文件,随机磁盘IO依旧是一个问题。另外,CombineFileInputFormat大多数情况下都不会考虑data locality,往往会通过网络从其他节点拉取数据。
为了实现这个,需要为不同的文件类型编写Java代码扩展CombineFileInputFormat类。这样实现一个自定义的类后,就可以配置最大的split大小,然后单个map任务会读取小文件并进行合并直到满足这个大小。以下有一个示例参考:
http://www.idryman.org/blog/2013/09/22/process-small-files-on-hadoop-using-combinefileinputformat-1/
当然如果是Hive作业有简单的方式,直接配置以下参数即可:
hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
set mapreduce.input.fileinputformat.split.maxsize=1073741824
set mapreduce.input.fileinputformat.split.minsize=1073741824
以上是以Hive的单个map作业合并小文件到1GB为示例。
注意以上无论是MapReduce代码实现方式还是Hive,因为合并的文件并不会持久化保存到磁盘,因此CombineFileInputFormat方式并不会缓解NameNode内存管理问题。只是提高MapReduce或者Hive作业的性能。
通过Hive合并小文件
如果你在使用Hive时因为“create table as”或“insert overwrite”语句输出了小文件,你可以通过设置一些参数来缓解。通过设置这些参数。Hive会在本身的SQL作业执行完毕后会单独起一个MapReduce任务来合并输出的小文件。
注意这个设置仅对Hive创建的文件生效,比如你使用Sqoop导数到Hive表,或者直接抽数到HDFS等,该方法都不会起作用。涉及的配置参数如下:

使用Hadoop的追加特性
有些人可能会问,为什么不使用Hadoop自带的Append特性来解决小文件问题,即当第一次输出是小文件时,后面的输出可以继续追加这些小文件,让小文件变成大文件,这听上去是个不错的建议,但其实做起来挺难的,因为Hadoop生态系统里的工具都不支持包括Flume,Sqoop,Pig,Hive,Spark,Impala和Java MapReduce。比如MapReduce任务有一个规定,输出结果目录必须是在之前不存在的。所以MapReduce作业肯定无法使用Append特性,由于Sqoop,Pig和Hive都使用了MapReduce,所以这些工具也不支持Append。Flume不支持Append主要是因为它假设经过一段时间比如几秒,多少字节,多少事件数或者不活动的秒数,Flume就会关闭文件而不再打开它。
如果你想使用Append来解决小文件问题,则你需要自己编写特定的程序来追加到现有的文件。另外,当集群中其他应用程序如果正在读取或处理这些需要追加的文件,你就不能使用自定义的MapReduce或者Spark程序来追加这些文件了。所以如果要使用这种方法,你最好还是谨慎考虑。
选择何种办法来解决小文件问题取决于各个方面,主要来自数据访问方式以及存储要求,具体包括:
1.小文件是在整个数据pipeline的哪个部分生成的?我们是要在抽数之前处理还是抽取到集群后处理?
2.是什么工具生成的小文件?可以通过调整工具的配置来减少小文件的数量吗?
3.企业的大数据团队的技能水平怎么样?他们有能力编写一些自定义程序来处理小文件或者抽数逻辑吗?他们未来有能力维护吗?
4.小文件生成的频率是多少?为了生成大文件,需要多久合并一次小文件?
5.什么工具会访问这些小文件?比如Hive,Impala,Spark或者其他程序?
6.对于一个生产集群来说的话,存在哪些时间窗口,集群有空余的资源来运行合并小文件的程序?
7.计算引擎访问数据时能接受怎样的延迟?这涉及我们考虑如何合并小文件,包括大小,压缩格式等。

Hive优化之小文件问题及其解决方案

使用sparkstreaming时,如果实时计算结果要写入到HDFS,那么不可避免的会遇到一个问题,那就是在默认情况下会产生非常多的小文件,这是由sparkstreaming的微批处理模式和DStream(RDD)的分布式(partition)特性导致的,sparkstreaming为每个partition启动一个独立的线程来处理数据,一旦文件输出到HDFS,那么这个文件流就关闭了,再来一个batch的parttition任务,就再使用一个新的文件流,那么假设,一个batch为10s,每个输出的DStream有32个partition,那么一个小时产生的文件数将会达到(3600/10) * 32=11520个之多。众多小文件带来的结果是有大量的文件元信息,比如文件的location、文件大小、block number等需要NameNode来维护,NameNode会因此鸭梨山大。不管是什么格式的文件,parquet、text,、JSON或者 Avro,都会遇到这种小文件问题,这里讨论几种处理Sparkstreaming小文件的典型方法。
增加batch大小
这种方法很容易理解,batch越大,从外部接收的event就越多,内存积累的数据也就越多,那么输出的文件数也就回变少,比如上边的时间从10s增加为100s,那么一个小时的文件数量就会减少到1152个。但别高兴太早,实时业务能等那么久吗,本来人家10s看到结果更新一次,现在要等快两分钟,是人都会骂娘。所以这种方法适用的场景是消息实时到达,但不想挤压在一起处理,因为挤压在一起处理的话,批处理任务在干等,这时就可以采用这种方法(是不是很像spark内部的pipeline模式,但是要注意区别哦)。
Coalesce和repartition
小文件的基数是:batch_number * partition_number,而第一种方法是减少batch_number,那么这种方法就是减少partition_number了,这个api不细说,就是减少初始的分区个数。看过spark源码的童鞋都知道,对于窄依赖,一个子RDD的partition规则继承父RDD,对于宽依赖(就是那些个叉叉叉ByKey操作),如果没有特殊指定分区个数,也继承自父rdd。那么初始的SourceDstream是几个partiion,最终的输出就是几个partition。所以Coalesce大法的好处就是,可以在最终要输出的时候,来减少一把partition个数。但是这个方法的缺点也很明显,本来是32个线程在写256M数据,现在可能变成了4个线程在写256M数据,而没有写完成这256M数据,这个batch是不算做结束的。那么一个batch的处理时延必定增长,batch挤压会逐渐增大。
我们在真正落盘之前,可以对RDD做如下两种操作之一:
rdd.coalesce(1, true)
rdd.repartition(1)
Spark Streaming在将结果输出到HDFS时是按分区来的,分区越多,产生的小文件自然也越多。coalesce()算子就用来为RDD重新分区,其源码如下,位于RDD类中。
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]

// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
该算子主要参数有两个:numPartitions表示目标分区数,shuffle表示重分区过程中是否Shuffle。
如果shuffle参数为true的话,会从一个随机分区开始,利用HashPartitioner将所有数据重新均匀分布到numPartitions个分区上,返回一个由CoalescedRDD包装的ShuffleRDD,父子RDD之间为宽依赖。如果shuffle参数为false,就直接返回CoalescedRDD,其内部就只是简单地将多个分区的数据flatMap之后合并为一个分区,父子RDD之间为窄依赖。
由上面的分析可知,若numPartitions大于原分区数,那么shuffle参数一定要设为true才可以。若numPartitions小于原分区数,那么又有两种情况要考虑:
分区数之间的比例不太悬殊。比如原有1000个分区,减少到200个分区,这时可以将shuffle设为false,因为子RDD中的一个分区只对应父RDD的5个分区,压力不大。分区数之间的比例悬殊。比如原有500个分区,减少到1个分区,就要将shuffle设为true,保证生成CoalescedRDD之前的操作有足够的并行度,防止Executor出现单点问题。这也就是本节开头的做法了。
repartition()算子是借助coalesce()实现的,就是shuffle参数默认为true的版本。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
SparkStreaming外部来处理
我们既然把数据输出到hdfs,那么说明肯定是要用hive或者sparksql这样的“sql on hadoop”系统类进一步进行数据分析,而这些表一般都是按照半小时或者一小时、一天,这样来分区的(注意不要和sparkStreaming的分区混淆,这里的分区,是用来做分区裁剪优化的),那么我们可以考虑在SparkStreaming外再启动定时的批处理任务来合并SparkStreaming产生的小文件。这种方法不是很直接,但是却比较有用,“性价比”较高,唯一要注意的是,批处理的合并任务在时间切割上要把握好,搞不好就可能回去合并一个还在写入的SparkStreaming小文件。
自己调用foreach去append
SparkStreaming提供的foreach这个outout类api,可以让我们自定义输出计算结果的方法。那么我们其实也可以利用这个特性,那就是每个batch在要写文件时,并不是去生成一个新的文件流,而是把之前的文件打开。考虑这种方法的可行性,首先,HDFS上的文件不支持修改,但是很多都支持追加,那么每个batch的每个partition就对应一个输出文件,每次都去追加这个partition对应的输出文件,这样也可以实现减少文件数量的目的。这种方法要注意的就是不能无限制的追加,当判断一个文件已经达到某一个阈值时,就要产生一个新的文件进行追加了。

Spark SQL 小文件问题产生原因分析以及处理方案

在生产中,无论是通过SQL语句或者Scala/Java等代码的方式使用Spark SQL处理数据,在Spark SQL写数据时,往往会遇到生成的小文件过多的问题,而管理这些大量的小文件,是一件非常头疼的事情。
大量的小文件会影响Hadoop集群管理或者Spark在处理数据时的稳定性:
1.Spark SQL写Hive或者直接写入HDFS,过多的小文件会对NameNode内存管理等产生巨大的压力,会影响整个集群的稳定运行 2.容易导致task数过多,如果超过参数spark.driver.maxResultSize的配置(默认1g),会抛出类似如下的异常,影响任务的处理
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 478 tasks (2026.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
当然可以通过调大spark.driver.maxResultSize的默认配置来解决问题,但如果不能从源头上解决小文件问题,以后还可能遇到类似的问题。
此外,Spark在处理任务时,一个分区分配一个task进行处理,多个分区并行处理,虽然并行处理能够提高处理效率,但不是意味着task数越多越好。如果数据量不大,过多的task运行反而会影响效率。
下面通过一个例子,Spark SQL写数据时,导致产生分区数"剧增"的典型场景,通过分区数"剧增",以及Spark中task数和分区数的关系等,来倒推小文件过多的可能原因(这里的分区数是指生成的DataSet/RDD的分区数,不是Hive分区表的分区概念):
现象
1) 对表test_tab进行写入操作
2) t1的分区数是100,t2的分区数是200,union all后生成的tmp分区数是300
3) test_tab产生的小文件数基本也在300左右
select * from t1 union all select * from t2 as tmp;
insert overwrite table test_tab select * from tmp;
分析
1)执行上述insert操作时的分区并行度,主要受tmp的分区数(对应一个DataSet)影响, 2)tmp的分区数主要受t1、t2以及union all的影响 3)暂且不考虑t1或t2是物理表还是经过其他处理生成的临时表,它们的分区数是确定的,这里主要看经过union all处理后,生成的tmp的分区数和t1、t2的分区数有何关系?4)Spark SQL语句中的union all对应到DataSet中即为unionAll算子,底层调用union算子
在之前的文章《重要|Spark分区并行度决定机制》中已经对Spark RDD中的union算子对union产生的新的RDD的分区数是如何受被union的多个RDD的影响的,做过详细介绍,这里直接给出结论:
通过分析源码,RDD在调用union算子时,最终生成的RDD分区数分两种情况:
1)union的RDD分区器已定义并且它们的分区器相同
多个父RDD具有相同的分区器,union后产生的RDD的分区器与父RDD相同且分区数也相同。比如,n个RDD的分区器相同且是defined,分区数是m个。那么这n个RDD最终union生成的一个RDD的分区数仍是m,分区器也是相同的
2)不满足第一种情况,则通过union生成的RDD的分区数为父RDD的分区数之和
同样的这种机制也可以套用到Spark SQL中的DataSet上,那么就很好解释了tmp的分区数为什么等于t1和t2的分区数的和。最后,Spark中一个task处理一个分区从而也会影响最终生成的文件数。
当然上述只是以Spark SQL中的一个场景阐述了小文件产生过多的原因之一(分区数过多)。在数仓建设中,产生小文件过多的原因有很多种,比如:
1.流式处理中,每个批次的处理执行保存操作也会产生很多小文件 2.为了解决数据更新问题,同一份数据保存了不同的几个状态,也容易导致文件数过多
那么如何解决这种小文件的问题呢?
  1. 通过repartition或coalesce算子控制最后的DataSet的分区数

  2. 将Hive风格的Coalesce and Repartition Hint 应用到Spark SQL需要注意这种方式对Spark的版本有要求,建议在Spark2.4.X及以上版本使用,示例:

    INSERT ... SELECT /*+ COALESCE(numPartitions) */ ...
    INSERT ... SELECT /*+ REPARTITION(numPartitions) */ ...
  3. 小文件定期合并 可以定时通过异步的方式针对Hive分区表的每一个分区中的小文件进行合并操作。

上述只是给出3种常见的解决办法,并且要结合实际用到的技术和场景去具体处理,比如对于HDFS小文件过多,也可以通过生成HAR 文件或者Sequence File来解决。

Flink小文件合并

Flink的filesystem connector支持写入hdfs,同时支持基于Checkpoint的滚动策略,每次做Checkpoint时将inprogress的文件变为正式文件,可供下游读取。由于并行度设置、数据量大小、Checkpoint配置的不同、分区的选择,都有可能导致产生大量的小文件,这对hdfs产生很大影响。但是可以通过一些手段来减少小文件,本文主要探讨一些filesystem connector支持的partition commit policy,通过自定义policy来合并小文件。
自定义 PartitionCommitPolicy
一个典型的 Parquet 文件合并的案例代码如下。我们可以通过 sink.partition-commit.policy.kind 参数进行配置:
package me.lmagics.flinkexp.hiveintegration.util;

import org.apache.flink.hive.shaded.parquet.example.data.Group;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.GroupReadSupport;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.flink.hive.shaded.parquet.hadoop.util.HadoopInputFile;
import org.apache.flink.hive.shaded.parquet.schema.MessageType;
import org.apache.flink.table.filesystem.PartitionCommitPolicy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ParquetFileMergingCommitPolicy implements PartitionCommitPolicy {
private static final Logger LOGGER = LoggerFactory.getLogger(ParquetFileMergingCommitPolicy.class);

@Override
public void commit(Context context) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
String partitionPath = context.partitionPath().getPath();

List<Path> files = listAllFiles(fs, new Path(partitionPath), "part-");
LOGGER.info("{} files in path {}", files.size(), partitionPath);

MessageType schema = getParquetSchema(files, conf);
if (schema == null) {
return;
}
LOGGER.info("Fetched parquet schema: {}", schema.toString());

Path result = merge(partitionPath, schema, files, fs);
LOGGER.info("Files merged into {}", result.toString());
}

private List<Path> listAllFiles(FileSystem fs, Path dir, String prefix) throws IOException {
List<Path> result = new ArrayList<>();

RemoteIterator<LocatedFileStatus> dirIterator = fs.listFiles(dir, false);
while (dirIterator.hasNext()) {
LocatedFileStatus fileStatus = dirIterator.next();
Path filePath = fileStatus.getPath();
if (fileStatus.isFile() && filePath.getName().startsWith(prefix)) {
result.add(filePath);
}
}

return result;
}

private MessageType getParquetSchema(List<Path> files, Configuration conf) throws IOException {
if (files.size() == 0) {
return null;
}

HadoopInputFile inputFile = HadoopInputFile.fromPath(files.get(0), conf);
ParquetFileReader reader = ParquetFileReader.open(inputFile);
ParquetMetadata metadata = reader.getFooter();
MessageType schema = metadata.getFileMetaData().getSchema();

reader.close();
return schema;
}

private Path merge(String partitionPath, MessageType schema, List<Path> files, FileSystem fs) throws IOException {
Path mergeDest = new Path(partitionPath + "/result-" + System.currentTimeMillis() + ".parquet");
ParquetWriter<Group> writer = ExampleParquetWriter.builder(mergeDest)
.withType(schema)
.withConf(fs.getConf())
.withWriteMode(Mode.CREATE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();

for (Path file : files) {
ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
.withConf(fs.getConf())
.build();
Group data;
while((data = reader.read()) != null) {
writer.write(data);
}
reader.close();
}
writer.close();

for (Path file : files) {
fs.delete(file, false);
}

return mergeDest;
}
}
Flink 1.12 中增加了两个参数配置:
  • auto-compaction 是否自动合并

  • compaction.file-size: compact target file size, default is rolling-file-size 合并后文件大小

增加了CompactFileWriter,并将之前的StreamingFileWriter变成了AbstractStreamingWriter抽象类。在notifyCheckpointComplete方法中像下游发送EndCheckpoint对象
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
output.collect(new StreamRecord<>(new EndCheckpoint(
checkpointId,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks())));
}
具体可参考 Flink 1.12 的更新日志:
  • 在 FileSystem/Hive connector 的流式写入中支持小文件合并 (FLINK-19345)

很多 bulk format,例如 Parquet,只有当写入的文件比较大时,才比较高效。当 checkpoint 的间隔比较小时,这会成为一个很大的问题,因为会创建大量的小文件。在 Flink 1.12 中,File Sink 增加了小文件合并功能,从而使得即使作业 checkpoint 间隔比较小时,也不会产生大量的文件。要开启小文件合并,可以在 FileSystem connector 中设置 auto-compaction = true 属性。
详情参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#file-compaction

参考目录

https://blog.csdn.net/weixin_43228814/article/details/88883310
https://blog.csdn.net/xuehuagongzi000/article/details/105978128/
http://r6d.cn/WwLe
https://www.lmlphp.com/user/1210/article/item/17926/
http://r6d.cn/WwKQ
https://zhuanlan.zhihu.com/p/197431746
http://r6d.cn/WwKZ
https://www.jianshu.com/p/6647fdb5246d
https://blog.csdn.net/pop_xiaohao/article/details/111154188

版权声明:

本文为《大数据真好玩》整理,原作者独家授权。未经原作者允许转载追究侵权责任。编辑|冷眼丶微信公众号|大数据真好玩

文章不错?点个【在看】吧! 👇

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存